/*
 * Copyright (c) 2016 MariaDB Corporation Ab
 *
 * Use of this software is governed by the Business Source License included
 * in the LICENSE.TXT file and at www.mariadb.com/bsl.
 *
 * Change Date: 2019-01-01
 *
 * On the date above, in accordance with the Business Source License, use
 * of this software will be governed by version 2 or later of the General
 * Public License.
 */

/**
 * @file service.c  - A representation of the service within the gateway.
 *
 * @verbatim
 * Revision History
 *
 * Date         Who                     Description
 * 18/06/13     Mark Riddoch            Initial implementation
 * 24/06/13     Massimiliano Pinto      Added: Loading users from mysql backend in serviceStart
 * 06/02/14     Massimiliano Pinto      Added: serviceEnableRootUser routine
 * 25/02/14     Massimiliano Pinto      Added: service refresh limit feature
 * 28/02/14     Massimiliano Pinto      users_alloc moved from service_alloc to
 *                                      serviceStartPort (generic hashable for services)
 * 07/05/14     Massimiliano Pinto      Added: version_string initialized to NULL
 * 23/05/14     Mark Riddoch            Addition of service validation call
 * 29/05/14     Mark Riddoch            Filter API implementation
 * 09/09/14     Massimiliano Pinto      Added service option for localhost authentication
 * 13/10/14     Massimiliano Pinto      Added hashtable for resources (i.e database names for MySQL services)
 * 06/02/15     Mark Riddoch            Added caching of authentication data
 * 18/02/15     Mark Riddoch            Added result set management
 * 03/03/15     Massimiliano Pinto      Added config_enable_feedback_task() call in serviceStartAll
 * 19/06/15     Martin Brampton         More meaningful names for temp variables
 * 31/05/16     Martin Brampton         Implement connection throttling
 *
 * @endverbatim
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <session.h>
#include <service.h>
#include <gw_protocol.h>
#include <listener.h>
#include <server.h>
#include <router.h>
#include <spinlock.h>
#include <modules.h>
#include <dcb.h>
#include <users.h>
#include <filter.h>
#include <dbusers.h>
#include <maxscale/poll.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <housekeeper.h>
#include <resultset.h>
#include <gw.h>
#include <gwdirs.h>
#include <math.h>
#include <version.h>
#include <queuemanager.h>

/** To be used with configuration type checks */
typedef struct typelib_st
{
    int          tl_nelems;
    const char*  tl_name;
    const char** tl_p_elems;
} typelib_t;

/** Set of subsequent false,true pairs */
static const char* bool_strings[11]  = {"FALSE", "TRUE", "OFF", "ON", "N", "Y", "0", "1", "NO", "YES", 0};
typelib_t bool_type  = {array_nelems(bool_strings) - 1, "bool_type", bool_strings};

/** List of valid values */
static const char* sqlvar_target_strings[4] = {"MASTER", "ALL", 0};
typelib_t sqlvar_target_type =
{
    array_nelems(sqlvar_target_strings) - 1,
    "sqlvar_target_type",
    sqlvar_target_strings
};

static SPINLOCK service_spin = SPINLOCK_INIT;
static SERVICE  *allServices = NULL;

static int find_type(typelib_t* tl, const char* needle, int maxlen);

static void service_add_qualified_param(SERVICE*          svc,
                                        CONFIG_PARAMETER* param);
static void service_internal_restart(void *data);

/**
 * Allocate a new service for the gateway to support
 *
 *
 * @param servname      The service name
 * @param router        Name of the router module this service uses
 *
 * @return              The newly created service or NULL if an error occurred
 */
SERVICE *
service_alloc(const char *servname, const char *router)
{
    SERVICE *service;

    if ((service = (SERVICE *)calloc(1, sizeof(SERVICE))) == NULL)
    {
        return NULL;
    }
    if ((service->router = load_module(router, MODULE_ROUTER)) == NULL)
    {
        char* home = get_libdir();
        char* ldpath = getenv("LD_LIBRARY_PATH");

        MXS_ERROR("Unable to load %s module \"%s\".\n\t\t\t"
                  "      Ensure that lib%s.so exists in one of the "
                  "following directories :\n\t\t\t      "
                  "- %s\n%s%s",
                  MODULE_ROUTER,
                  router,
                  router,
                  home,
                  ldpath ? "\t\t\t      - " : "",
                  ldpath ? ldpath : "");
        free(service);
        return NULL;
    }
    service->client_count = 0;
    service->name = strdup(servname);
    service->routerModule = strdup(router);
    service->users_from_all = false;
    service->queued_connections = NULL;
    service->resources = NULL;
    service->localhost_match_wildcard_host = SERVICE_PARAM_UNINIT;
    service->retry_start = true;
    service->conn_idle_timeout = SERVICE_NO_SESSION_TIMEOUT;
    service->weightby = NULL;
    service->credentials.authdata = NULL;
    service->credentials.name = NULL;
    service->version_string = NULL;
    service->svc_config_param = NULL;
    service->users = NULL;
    service->routerOptions = NULL;
    service->log_auth_warnings = true;
    service->strip_db_esc = true;
    if (service->name == NULL || service->routerModule == NULL)
    {
        if (service->name)
        {
            free(service->name);
        }
        free(service);
        return NULL;
    }
    service->stats.started = time(0);
    service->stats.n_failed_starts = 0;
    service->state = SERVICE_STATE_ALLOC;
    spinlock_init(&service->spin);
    spinlock_init(&service->users_table_spin);

    spinlock_acquire(&service_spin);
    service->next = allServices;
    allServices = service;
    spinlock_release(&service_spin);

    return service;
}

/**
 * Check to see if a service pointer is valid
 *
 * @param service       The pointer to check
 * @return 1 if the service is in the list of all services
 */
int
service_isvalid(SERVICE *service)
{
    SERVICE *checkservice;
    int rval = 0;

    spinlock_acquire(&service_spin);
    checkservice = allServices;
    while (checkservice)
    {
        if (checkservice == service)
        {
            rval = 1;
            break;
        }
        checkservice = checkservice->next;
    }
    spinlock_release(&service_spin);
    return rval;
}

/**
 * Start an individual port/protocol pair
 *
 * @param service       The service
 * @param port          The port to start
 * @return              The number of listeners started
 */
static int
serviceStartPort(SERVICE *service, SERV_LISTENER *port)
{
    int listeners = 0;
    char config_bind[40];
    GWPROTOCOL *funcs;

    if (service == NULL || service->router == NULL || service->router_instance == NULL)
    {
        /* Should never happen, this guarantees it can't */
        MXS_ERROR("Attempt to start port with null or incomplete service");
        goto retblock;
    }

    port->listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, port);

    if (port->listener == NULL)
    {
        MXS_ERROR("Failed to create listener for service %s.", service->name);
        goto retblock;
    }

    if (port->ssl)
    {
        listener_init_SSL(port->ssl);
    }

    if (strcmp(port->protocol, "MySQLClient") == 0)
    {
        int loaded;

        if (service->users == NULL)
        {
            /*
             * Allocate specific data for MySQL users
             * including hosts and db names
             */
            service->users = mysql_users_alloc();

            if ((loaded = load_mysql_users(service)) < 0)
            {
                MXS_ERROR("Unable to load users for "
                          "service %s listening at %s:%d.",
                          service->name,
                          (port->address == NULL ? "0.0.0.0" : port->address),
                          port->port);

                {
                    /* Try loading authentication data from file cache */
                    char *ptr, path[PATH_MAX + 1];
                    strncpy(path, get_cachedir(), sizeof(path) - 1);
                    strncat(path, "/", sizeof(path) - 1);
                    strncat(path, service->name, sizeof(path) - 1);
                    strncat(path, "/.cache/dbusers", sizeof(path) - 1);
                    loaded = dbusers_load(service->users, path);
                    if (loaded != -1)
                    {
                        MXS_ERROR("Using cached credential information.");
                    }
                }
                if (loaded == -1)
                {
                    dcb_close(port->listener);
                    port->listener = NULL;
                    goto retblock;
                }
            }
            else
            {
                /* Save authentication data to file cache */
                char *ptr, path[PATH_MAX + 1];
                int mkdir_rval = 0;
                strncpy(path, get_cachedir(), PATH_MAX);
                strncat(path, "/", 4096);
                strncat(path, service->name, PATH_MAX);
                if (access(path, R_OK) == -1)
                {
                    mkdir_rval = mkdir(path, 0777);
                }

                if (mkdir_rval)
                {
                    if (errno != EEXIST)
                    {
                        char errbuf[STRERROR_BUFLEN];
                        MXS_ERROR("Failed to create directory '%s': [%d] %s",
                                  path,
                                  errno,
                                  strerror_r(errno, errbuf, sizeof(errbuf)));
                    }
                    mkdir_rval = 0;
                }

                strncat(path, "/.cache", PATH_MAX);
                if (access(path, R_OK) == -1)
                {
                    mkdir_rval = mkdir(path, 0777);
                }

                if (mkdir_rval)
                {
                    if (errno != EEXIST)
                    {
                        char errbuf[STRERROR_BUFLEN];
                        MXS_ERROR("Failed to create directory '%s': [%d] %s",
                                  path,
                                  errno,
                                  strerror_r(errno, errbuf, sizeof(errbuf)));
                    }
                    mkdir_rval = 0;
                }
                strncat(path, "/dbusers", PATH_MAX);
                dbusers_save(service->users, path);
            }
            if (loaded == 0)
            {
                MXS_ERROR("Service %s: failed to load any user "
                          "information. Authentication will "
                          "probably fail as a result.",
                          service->name);
            }

            /* At service start last update is set to USERS_REFRESH_TIME seconds earlier.
             * This way MaxScale could try reloading users' just after startup
             */
            service->rate_limit.last = time(NULL) - USERS_REFRESH_TIME;
            service->rate_limit.nloads = 1;

            MXS_NOTICE("Loaded %d MySQL Users for service [%s].",
                       loaded, service->name);
        }
    }
    else
    {
        if (service->users == NULL)
        {
            /* Generic users table */
            service->users = users_alloc();
        }
    }

    if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL)
    {
        dcb_close(port->listener);
        port->listener = NULL;
        MXS_ERROR("Unable to load protocol module %s. Listener "
                  "for service %s not started.",
                  port->protocol,
                  service->name);
        goto retblock;
    }
    memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL));

    if (port->address)
    {
        sprintf(config_bind, "%s:%d", port->address, port->port);
    }
    else
    {
        sprintf(config_bind, "0.0.0.0:%d", port->port);
    }

    if (port->listener->func.listen(port->listener, config_bind))
    {
        port->listener->session = session_alloc(service, port->listener);

        if (port->listener->session != NULL)
        {
            port->listener->session->state = SESSION_STATE_LISTENER;
            listeners += 1;
        }
        else
        {
            MXS_ERROR("Failed to create session to service %s.",
                      service->name);

            dcb_close(port->listener);
            port->listener = NULL;
            goto retblock;
        }
    }
    else
    {
        MXS_ERROR("Unable to start to listen port %d for %s %s.",
                  port->port,
                  port->protocol,
                  service->name);
        dcb_close(port->listener);
        port->listener = NULL;
    }

retblock:
    return listeners;
}

/**
 * Start all ports for a service.
 * serviceStartAllPorts will try to start all listeners associated with the service.
 * If no listeners are started, the starting of ports will be retried after a period of time.
 * @param service Service to start
 * @return Number of started listeners. This is equal to the number of ports the service
 * is listening to.
 */
int serviceStartAllPorts(SERVICE* service)
{
    SERV_LISTENER *port = service->ports;
    int listeners = 0;

    if (port)
    {
        while (!service->svc_do_shutdown && port)
        {
            listeners += serviceStartPort(service, port);
            port = port->next;
        }

        if (listeners)
        {
            service->state = SERVICE_STATE_STARTED;
            service->stats.started = time(0);
        }
        else if (service->retry_start)
        {
            /** Service failed to start any ports. Try again later. */
            service->stats.n_failed_starts++;
            char taskname[strlen(service->name) + strlen("_start_retry_") +
                          (int) ceil(log10(INT_MAX)) + 1];
            int retry_after = MIN(service->stats.n_failed_starts * 10, SERVICE_MAX_RETRY_INTERVAL);
            snprintf(taskname, sizeof(taskname), "%s_start_retry_%d",
                     service->name, service->stats.n_failed_starts);
            hktask_oneshot(taskname, service_internal_restart,
                           (void*) service, retry_after);
            MXS_NOTICE("Failed to start service %s, retrying in %d seconds.",
                       service->name, retry_after);

            /** This will prevent MaxScale from shutting down if service start is retried later */
            listeners = 1;
        }
    }
    else
    {
        MXS_WARNING("Service '%s' has no listeners defined.", service->name);
        listeners = 1; /** Set this to one to suppress errors */
    }

    return listeners;
}

/** Helper function for copying an array of strings */
static char** copy_string_array(char** original)
{
    char **array = NULL;

    if (original)
    {
        int values = 0;

        while (original[values])
        {
            values++;
        }

        array = malloc(sizeof(char*) * (values + 1));

        if (array)
        {
            for (int i = 0; i < values; i++)
            {
                array[i] = strdup(original[i]);
            }
            array[values] = NULL;
        }
    }
    return array;
}

/** Helper function for freeing an array of strings */
static void free_string_array(char** array)
{
    if (array)
    {
        for (int i = 0; array[i]; i++)
        {
            free(array[i]);
        }
        free(array);
    }
}

/**
 * Start a service
 *
 * This function loads the protocol modules for each port on which the
 * service listens and starts the listener on that port
 *
 * Also create the router_instance for the service.
 *
 * @param service       The Service that should be started
 * @return      Returns the number of listeners created
 */
int
serviceStart(SERVICE *service)
{
    int listeners = 0;

    if (check_service_permissions(service))
    {
        char **router_options = copy_string_array(service->routerOptions);
        if ((service->router_instance = service->router->createInstance(
                                            service, router_options)))
        {
            listeners += serviceStartAllPorts(service);
        }
        else
        {
            MXS_ERROR("%s: Failed to create router instance for service. Service not started.",
                      service->name);
            service->state = SERVICE_STATE_FAILED;
        }
        free_string_array(router_options);
    }
    else
    {
        MXS_ERROR("%s: Inadequate user permissions for service. Service not started.",
                  service->name);
        service->state = SERVICE_STATE_FAILED;
    }
    return listeners;
}

/**
 * Start an individual listener
 *
 * @param service       The service to start the listener for
 * @param protocol      The name of the protocol
 * @param port          The port number
 */
void
serviceStartProtocol(SERVICE *service, char *protocol, int port)
{
    SERV_LISTENER *ptr;

    ptr = service->ports;
    while (ptr)
    {
        if (strcmp(ptr->protocol, protocol) == 0 && ptr->port == port)
        {
            serviceStartPort(service, ptr);
        }
        ptr = ptr->next;
    }
}


/**
 * Start all the services
 *
 * @return Return the number of services started
 */
int
serviceStartAll()
{
    SERVICE *ptr;
    int n = 0, i;
    bool error = false;

    config_enable_feedback_task();

    ptr = allServices;
    while (ptr && !ptr->svc_do_shutdown)
    {
        n += (i = serviceStart(ptr));

        if (i == 0)
        {
            MXS_ERROR("Failed to start service '%s'.", ptr->name);
            error = true;
        }

        ptr = ptr->next;
    }
    return error ? 0 : n;
}

/**
 * Stop a service
 *
 * This function stops the listener for the service
 *
 * @param service       The Service that should be stopped
 * @return      Returns the number of listeners restarted
 */
int
serviceStop(SERVICE *service)
{
    SERV_LISTENER *port;
    int listeners = 0;

    port = service->ports;
    while (port)
    {
        if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER)
        {
            if (poll_remove_dcb(port->listener) == 0)
            {
                port->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
                listeners++;
            }
        }
        port = port->next;
    }
    service->state = SERVICE_STATE_STOPPED;

    return listeners;
}

/**
 * Restart a service
 *
 * This function stops the listener for the service
 *
 * @param service       The Service that should be restarted
 * @return      Returns the number of listeners restarted
 */
int
serviceRestart(SERVICE *service)
{
    SERV_LISTENER *port;
    int listeners = 0;

    port = service->ports;
    while (port)
    {
        if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED)
        {
            if (poll_add_dcb(port->listener) == 0)
            {
                port->listener->session->state = SESSION_STATE_LISTENER;
                listeners++;
            }
        }
        port = port->next;
    }
    service->state = SERVICE_STATE_STARTED;
    return listeners;
}


/**
 * Deallocate the specified service
 *
 * @param service       The service to deallocate
 * @return      Returns true if the service was freed
 */
int
service_free(SERVICE *service)
{
    SERVICE *ptr;
    SERVER_REF *srv;
    if (service->stats.n_current)
    {
        return 0;
    }
    /* First of all remove from the linked list */
    spinlock_acquire(&service_spin);
    if (allServices == service)
    {
        allServices = service->next;
    }
    else
    {
        ptr = allServices;
        while (ptr && ptr->next != service)
        {
            ptr = ptr->next;
        }
        if (ptr)
        {
            ptr->next = service->next;
        }
    }
    spinlock_release(&service_spin);

    /* Clean up session and free the memory */
    while (service->dbref)
    {
        srv = service->dbref;
        service->dbref = service->dbref->next;
        free(srv);
    }

    free(service->name);
    free(service->routerModule);
    free(service->weightby);
    free(service->version_string);
    free(service->credentials.name);
    free(service->credentials.authdata);

    free_config_parameter(service->svc_config_param);
    users_free(service->users);
    hashtable_free(service->resources);
    serviceClearRouterOptions(service);

    free(service);
    return 1;
}

/**
 * Add a protocol/port pair to the service
 *
 * @param service       The service
 * @param protocol      The name of the protocol module
 * @param address       The address to listen with
 * @param port          The port to listen on
 * @param authenticator Name of the authenticator to be used
 * @param ssl           SSL configuration
 * @return      TRUE if the protocol/port could be added
 */
int
serviceAddProtocol(SERVICE *service, char *protocol, char *address, unsigned short port, char *authenticator,
                   SSL_LISTENER *ssl)
{
    SERV_LISTENER   *proto;

    if ((proto = listener_alloc(protocol, address, port, authenticator, ssl)) != NULL)
    {
        spinlock_acquire(&service->spin);
        proto->next = service->ports;
        service->ports = proto;
        spinlock_release(&service->spin);
        return 1;
    }

    return 0;
}

/**
 * Check if a protocol/port pair is part of the service
 *
 * @param service       The service
 * @param protocol      The name of the protocol module
 * @param address       The address to listen on
 * @param port          The port to listen on
 * @return      TRUE if the protocol/port is already part of the service
 */
int serviceHasProtocol(SERVICE *service, const char *protocol,
                       const char* address, unsigned short port)
{
    SERV_LISTENER *proto;

    spinlock_acquire(&service->spin);
    proto = service->ports;
    while (proto)
    {
        if (strcmp(proto->protocol, protocol) == 0 && proto->port == port &&
            ((address && proto->address && strcmp(proto->address, address) == 0) ||
             (address == NULL && proto->address == NULL)))
        {
            break;
        }
        proto = proto->next;
    }
    spinlock_release(&service->spin);

    return proto != NULL;
}

/**
 * Add a backend database server to a service
 *
 * @param service       The service to add the server to
 * @param server        The server to add
 */
void
serviceAddBackend(SERVICE *service, SERVER *server)
{
    SERVER_REF *sref = malloc(sizeof(SERVER_REF));

    if (sref)
    {
        sref->next = NULL;
        sref->server = server;

        spinlock_acquire(&service->spin);
        if (service->dbref)
        {
            SERVER_REF *ref = service->dbref;
            while (ref->next)
            {
                ref = ref->next;
            }
            ref->next = sref;
        }
        else
        {
            service->dbref = sref;
        }
        spinlock_release(&service->spin);
    }
}

/**
 * Test if a server is part of a service
 *
 * @param service       The service to add the server to
 * @param server        The server to add
 * @return              Non-zero if the server is already part of the service
 */
int
serviceHasBackend(SERVICE *service, SERVER *server)
{
    SERVER_REF *ptr;

    spinlock_acquire(&service->spin);
    ptr = service->dbref;
    while (ptr && ptr->server != server)
    {
        ptr = ptr->next;
    }
    spinlock_release(&service->spin);

    return ptr != NULL;
}

/**
 * Add a router option to a service
 *
 * @param service       The service to add the router option to
 * @param option        The option string
 */
void
serviceAddRouterOption(SERVICE *service, char *option)
{
    int i;

    spinlock_acquire(&service->spin);
    if (service->routerOptions == NULL)
    {
        service->routerOptions = (char **)calloc(2, sizeof(char *));
        service->routerOptions[0] = strdup(option);
        service->routerOptions[1] = NULL;
    }
    else
    {
        for (i = 0; service->routerOptions[i]; i++)
        {
            ;
        }
        service->routerOptions = (char **)realloc(service->routerOptions,
                                                  (i + 2) * sizeof(char *));
        service->routerOptions[i] = strdup(option);
        service->routerOptions[i + 1] = NULL;
    }
    spinlock_release(&service->spin);
}

/**
 * Remove the router options for the service
 *
 * @param       service The service to remove the options from
 */
void
serviceClearRouterOptions(SERVICE *service)
{
    int i;

    spinlock_acquire(&service->spin);
    if (service->routerOptions != NULL)
    {
        for (i = 0; service->routerOptions[i]; i++)
        {
            free(service->routerOptions[i]);
        }
        free(service->routerOptions);
        service->routerOptions = NULL;
    }
    spinlock_release(&service->spin);
}
/**
 * Set the service user that is used to log in to the backebd servers
 * associated with this service.
 *
 * @param service       The service we are setting the data for
 * @param user          The user name to use for connections
 * @param auth          The authentication data we need, e.g. MySQL SHA1 password
 * @return      0 on failure
 */
int
serviceSetUser(SERVICE *service, char *user, char *auth)
{
    if (service->credentials.name)
    {
        free(service->credentials.name);
    }
    if (service->credentials.authdata)
    {
        free(service->credentials.authdata);
    }
    service->credentials.name = strdup(user);
    service->credentials.authdata = strdup(auth);

    if (service->credentials.name == NULL || service->credentials.authdata == NULL)
    {
        return 0;
    }
    return 1;
}


/**
 * Get the service user that is used to log in to the backebd servers
 * associated with this service.
 *
 * @param service       The service we are setting the data for
 * @param user          The user name to use for connections
 * @param auth          The authentication data we need, e.g. MySQL SHA1 password
 * @return              0 on failure
 */
int
serviceGetUser(SERVICE *service, char **user, char **auth)
{
    if (service->credentials.name == NULL || service->credentials.authdata == NULL)
    {
        return 0;
    }
    *user = service->credentials.name;
    *auth = service->credentials.authdata;
    return 1;
}

/**
 * Enable/Disable root user for this service
 * associated with this service.
 *
 * @param service       The service we are setting the data for
 * @param action        1 for root enable, 0 for disable access
 * @return              0 on failure
 */

int
serviceEnableRootUser(SERVICE *service, int action)
{
    if (action != 0 && action != 1)
    {
        return 0;
    }

    service->enable_root = action;

    return 1;
}

/**
 * Enable/Disable loading the user data from only one server or all of them
 *
 * @param service       The service we are setting the data for
 * @param action        1 for all servers, 0 for single server
 * @return              0 on failure
 */

int
serviceAuthAllServers(SERVICE *service, int action)
{
    if (action != 0 && action != 1)
    {
        return 0;
    }

    service->users_from_all = action;

    return 1;
}

/**
 * Whether to strip escape characters from the name of the database the client
 * is connecting to.
 * @param service Service to configure
 * @param action 0 for disabled, 1 for enabled
 * @return 1 if successful, 0 on error
 */
int serviceStripDbEsc(SERVICE* service, int action)
{
    if (action != 0 && action != 1)
    {
        return 0;
    }

    service->strip_db_esc = action;

    return 1;
}


/**
 * Sets the session timeout for the service.
 * @param service Service to configure
 * @param val Timeout in seconds
 * @return 1 on success, 0 when the value is invalid
 */
int
serviceSetTimeout(SERVICE *service, int val)
{

    if (val < 0)
    {
        return 0;
    }

    /** Enable the session timeout checks if and only if at least one service is
     * configured with a idle timeout. */
    if ((service->conn_idle_timeout = val))
    {
        enable_session_timeouts();
    }

    return 1;
}

/**
 * Sets the connection limits, if any, for the service.
 * @param service Service to configure
 * @param max The maximum number of client connections at any one time
 * @param queued    The maximum number of connections to queue up when
 *                  max_connections clients are already connected
 * @return 1 on success, 0 when the values are invalid
 */
int
serviceSetConnectionLimits(SERVICE *service, int max, int queued, int timeout)
{

    if (max < 0 || queued < 0)
    {
        return 0;
    }

    service->max_connections = max;
    if (queued && timeout)
    {
        /* If memory allocation fails, result will be null so no queue */
        service->queued_connections = mxs_queue_alloc(queued, timeout);
    }

    return 1;
}

/**
 * Enable or disable the restarting of the service on failure.
 * @param service Service to configure
 * @param value A string representation of a boolean value
 */
void serviceSetRetryOnFailure(SERVICE *service, char* value)
{
    if (value)
    {
        service->retry_start = config_truth_value(value);
    }
}

/**
 * Set the filters used by the service
 *
 * @param service       The service itself
 * @param filters       ASCII string of filters to use
 * @return True if loading and creating all filters was successful. False if a
 * filter module was not found or the instance creation failed.
 */
bool
serviceSetFilters(SERVICE *service, char *filters)
{
    FILTER_DEF **flist;
    char *ptr, *brkt;
    int n = 0;
    bool rval = true;

    if ((flist = (FILTER_DEF **) malloc(sizeof(FILTER_DEF *))) == NULL)
    {
        MXS_ERROR("Out of memory adding filters to service.\n");
        return false;
    }
    ptr = strtok_r(filters, "|", &brkt);
    while (ptr)
    {
        n++;
        FILTER_DEF **tmp;
        if ((tmp = (FILTER_DEF **) realloc(flist,
                                           (n + 1) * sizeof(FILTER_DEF *))) == NULL)
        {
            MXS_ERROR("Out of memory adding filters to service.");
            rval = false;
            break;
        }

        flist = tmp;
        char *filter_name = trim(ptr);

        if ((flist[n - 1] = filter_find(filter_name)))
        {
            if (!filter_load(flist[n - 1]))
            {
                MXS_ERROR("Failed to load filter '%s' for service '%s'.",
                          filter_name, service->name);
                rval = false;
                break;
            }
        }
        else
        {
            MXS_ERROR("Unable to find filter '%s' for service '%s'\n",
                      filter_name, service->name);
            rval = false;
            break;
        }

        flist[n] = NULL;
        ptr = strtok_r(NULL, "|", &brkt);
    }

    if (rval)
    {
        service->filters = flist;
        service->n_filters = n;
    }
    else
    {
        free(flist);
    }

    return rval;
}

/**
 * Return a named service
 *
 * @param servname      The name of the service to find
 * @return The service or NULL if not found
 */
SERVICE *
service_find(char *servname)
{
    SERVICE *service;

    spinlock_acquire(&service_spin);
    service = allServices;
    while (service && strcmp(service->name, servname) != 0)
    {
        service = service->next;
    }
    spinlock_release(&service_spin);

    return service;
}


/**
 * Print details of an individual service
 *
 * @param service       Service to print
 */
void
printService(SERVICE *service)
{
    SERVER_REF  *ptr = service->dbref;
    struct tm result;
    char time_buf[30];
    int i;

    printf("Service %p\n", (void *)service);
    printf("\tService:                              %s\n", service->name);
    printf("\tRouter:                               %s (%p)\n",
           service->routerModule, (void *)service->router);
    printf("\tStarted:              %s",
           asctime_r(localtime_r(&service->stats.started, &result), time_buf));
    printf("\tBackend databases\n");
    while (ptr)
    {
        printf("\t\t%s:%d  Protocol: %s\n", ptr->server->name, ptr->server->port, ptr->server->protocol);
        ptr = ptr->next;
    }
    if (service->n_filters)
    {
        printf("\tFilter chain:         ");
        for (i = 0; i < service->n_filters; i++)
        {
            printf("%s %s ", service->filters[i]->name,
                   i + 1 < service->n_filters ? "|" : "");
        }
        printf("\n");
    }
    printf("\tUsers data:           %p\n", (void *)service->users);
    printf("\tTotal connections:    %d\n", service->stats.n_sessions);
    printf("\tCurrently connected:  %d\n", service->stats.n_current);
}

/**
 * Print all services
 *
 * Designed to be called within a debugger session in order
 * to display all active services within the gateway
 */
void
printAllServices()
{
    SERVICE *ptr;

    spinlock_acquire(&service_spin);
    ptr = allServices;
    while (ptr)
    {
        printService(ptr);
        ptr = ptr->next;
    }
    spinlock_release(&service_spin);
}

/**
 * Print all services to a DCB
 *
 * Designed to be called within a CLI command in order
 * to display all active services within the gateway
 */
void
dprintAllServices(DCB *dcb)
{
    SERVICE *ptr;

    spinlock_acquire(&service_spin);
    ptr = allServices;
    while (ptr)
    {
        dprintService(dcb, ptr);
        ptr = ptr->next;
    }
    spinlock_release(&service_spin);
}

/**
 * Print details of a single service.
 *
 * @param dcb           DCB to print data to
 * @param service       The service to print
 */
void dprintService(DCB *dcb, SERVICE *service)
{
    SERVER_REF *server = service->dbref;
    struct tm result;
    char timebuf[30];
    int i;

    dcb_printf(dcb, "Service %p\n", service);
    dcb_printf(dcb, "\tService:                             %s\n",
               service->name);
    dcb_printf(dcb, "\tRouter:                              %s (%p)\n",
               service->routerModule, service->router);
    switch (service->state)
    {
        case SERVICE_STATE_STARTED:
            dcb_printf(dcb, "\tState:                               Started\n");
            break;
        case SERVICE_STATE_STOPPED:
            dcb_printf(dcb, "\tState:                               Stopped\n");
            break;
        case SERVICE_STATE_FAILED:
            dcb_printf(dcb, "\tState:                               Failed\n");
            break;
        case SERVICE_STATE_ALLOC:
            dcb_printf(dcb, "\tState:                               Allocated\n");
            break;
    }
    if (service->router && service->router_instance)
    {
        service->router->diagnostics(service->router_instance, dcb);
    }
    dcb_printf(dcb, "\tStarted:                             %s",
               asctime_r(localtime_r(&service->stats.started, &result), timebuf));
    dcb_printf(dcb, "\tRoot user access:                    %s\n",
               service->enable_root ? "Enabled" : "Disabled");
    if (service->n_filters)
    {
        dcb_printf(dcb, "\tFilter chain:                ");
        for (i = 0; i < service->n_filters; i++)
        {
            dcb_printf(dcb, "%s %s ", service->filters[i]->name,
                       i + 1 < service->n_filters ? "|" : "");
        }
        dcb_printf(dcb, "\n");
    }
    dcb_printf(dcb, "\tBackend databases:\n");
    while (server)
    {
        dcb_printf(dcb, "\t\t%s:%d  Protocol: %s\n", server->server->name, server->server->port,
                   server->server->protocol);
        server = server->next;
    }
    if (service->weightby)
    {
        dcb_printf(dcb, "\tRouting weight parameter:            %s\n",
                   service->weightby);
    }
    dcb_printf(dcb, "\tUsers data:                          %p\n",
               service->users);
    dcb_printf(dcb, "\tTotal connections:                   %d\n",
               service->stats.n_sessions);
    dcb_printf(dcb, "\tCurrently connected:                 %d\n",
               service->stats.n_current);
}

/**
 * List the defined services in a tabular format.
 *
 * @param dcb           DCB to print the service list to.
 */
void
dListServices(DCB *dcb)
{
    SERVICE *service;

    spinlock_acquire(&service_spin);
    service = allServices;
    if (service)
    {
        dcb_printf(dcb, "Services.\n");
        dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n");
        dcb_printf(dcb, "%-25s | %-20s | #Users | Total Sessions\n",
                   "Service Name", "Router Module");
        dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n");
    }
    while (service)
    {
        ss_dassert(service->stats.n_current >= 0);
        dcb_printf(dcb, "%-25s | %-20s | %6d | %5d\n",
                   service->name, service->routerModule,
                   service->stats.n_current, service->stats.n_sessions);
        service = service->next;
    }
    if (allServices)
    {
        dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n\n");
    }
    spinlock_release(&service_spin);
}

/**
 * List the defined listeners in a tabular format.
 *
 * @param dcb           DCB to print the service list to.
 */
void
dListListeners(DCB *dcb)
{
    SERVICE *service;
    SERV_LISTENER *lptr;

    spinlock_acquire(&service_spin);
    service = allServices;
    if (service)
    {
        dcb_printf(dcb, "Listeners.\n");
        dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n");
        dcb_printf(dcb, "%-20s | %-18s | %-15s | Port  | State\n",
                   "Service Name", "Protocol Module", "Address");
        dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n");
    }
    while (service)
    {
        lptr = service->ports;
        while (lptr)
        {
            dcb_printf(dcb, "%-20s | %-18s | %-15s | %5d | %s\n",
                       service->name, lptr->protocol,
                       (lptr && lptr->address) ? lptr->address : "*",
                       lptr->port,
                       (!lptr->listener ||
                        !lptr->listener->session ||
                        lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ?
                       "Stopped" : "Running");

            lptr = lptr->next;
        }
        service = service->next;
    }
    if (allServices)
    {
        dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n\n");
    }
    spinlock_release(&service_spin);
}

/**
 * Update the definition of a service
 *
 * @param service       The service to update
 * @param router        The router module to use
 * @param user          The user to use to extract information from the database
 * @param auth          The password for the user above
 */
void
service_update(SERVICE *service, char *router, char *user, char *auth)
{
    void *router_obj;

    if (!strcmp(service->routerModule, router))
    {
        if ((router_obj = load_module(router, MODULE_ROUTER)) == NULL)
        {
            MXS_ERROR("Failed to update router "
                      "for service %s to %s.",
                      service->name,
                      router);
        }
        else
        {
            MXS_NOTICE("Update router for service %s to %s.",
                       service->name,
                       router);
            free(service->routerModule);
            service->routerModule = strdup(router);
            service->router = router_obj;
        }
    }
    if (user &&
        (strcmp(service->credentials.name, user) != 0 ||
         strcmp(service->credentials.authdata, auth) != 0))
    {
        MXS_NOTICE("Update credentials for service %s.", service->name);
        serviceSetUser(service, user, auth);
    }
}

/**
 * Refresh the database users for the service
 * This function replaces the MySQL users used by the service with the latest
 * version found on the backend servers. There is a limit on how often the users
 * can be reloaded and if this limit is exceeded, the reload will fail.
 * @param service Service to reload
 * @return 0 on success and 1 on error
 */
int service_refresh_users(SERVICE *service)
{
    int ret = 1;
    /* check for another running getUsers request */
    if (!spinlock_acquire_nowait(&service->users_table_spin))
    {
        MXS_DEBUG("%s: [service_refresh_users] failed to get get lock for "
                  "loading new users' table: another thread is loading users",
                  service->name);

        return 1;
    }

    /* check if refresh rate limit has exceeded */
    if ((time(NULL) < (service->rate_limit.last + USERS_REFRESH_TIME)) ||
        (service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME))
    {
        spinlock_release(&service->users_table_spin);
        MXS_ERROR("%s: Refresh rate limit exceeded for load of users' table.",
                  service->name);

        return 1;
    }

    service->rate_limit.nloads++;

    /* update time and counter */
    if (service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME)
    {
        service->rate_limit.nloads = 1;
        service->rate_limit.last = time(NULL);
    }

    ret = replace_mysql_users(service);

    /* remove lock */
    spinlock_release(&service->users_table_spin);

    if (ret >= 0)
    {
        return 0;
    }
    else
    {
        return 1;
    }
}

bool service_set_param_value(SERVICE*            service,
                             CONFIG_PARAMETER*   param,
                             char*               valstr,
                             count_spec_t        count_spec,
                             config_param_type_t type)
{
    char* p;
    int valint;
    bool valbool;
    target_t valtarget;
    bool succp = true;

    if (PARAM_IS_TYPE(type, PERCENT_TYPE) || PARAM_IS_TYPE(type, COUNT_TYPE))
    {
        /**
         * Find out whether the value is numeric and ends with '%' or '\0'
         */
        p = valstr;

        while (isdigit(*p))
        {
            p++;
        }

        errno = 0;

        if (p == valstr || (*p != '%' && *p != '\0'))
        {
            succp = false;
        }
        else if (*p == '%')
        {
            if (*(p + 1) == '\0')
            {
                *p = '\0';
                valint = (int) strtol(valstr, (char **)NULL, 10);

                if (valint == 0 && errno != 0)
                {
                    succp = false;
                }
                else if (PARAM_IS_TYPE(type, PERCENT_TYPE))
                {
                    succp = true;
                    config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE);
                }
                else
                {
                    /** Log error */
                }
            }
            else
            {
                succp = false;
            }
        }
        else if (*p == '\0')
        {
            valint = (int) strtol(valstr, (char **)NULL, 10);

            if (valint == 0 && errno != 0)
            {
                succp = false;
            }
            else if (PARAM_IS_TYPE(type, COUNT_TYPE))
            {
                succp = true;
                config_set_qualified_param(param, (void *)&valint, COUNT_TYPE);
            }
            else
            {
                /** Log error */
            }
        }
    }
    else if (type == BOOL_TYPE)
    {
        unsigned int rc;

        rc = find_type(&bool_type, valstr, strlen(valstr) + 1);

        if (rc > 0)
        {
            succp = true;
            if (rc % 2 == 1)
            {
                valbool = false;
            }
            else if (rc % 2 == 0)
            {
                valbool = true;
            }
            /** add param to config */
            config_set_qualified_param(param, (void *)&valbool, BOOL_TYPE);
        }
        else
        {
            succp = false;
        }
    }
    else if (type == SQLVAR_TARGET_TYPE)
    {
        unsigned int rc;

        rc = find_type(&sqlvar_target_type, valstr, strlen(valstr) + 1);

        if (rc > 0 && rc < 3)
        {
            succp = true;
            if (rc == 1)
            {
                valtarget = TYPE_MASTER;
            }
            else if (rc == 2)
            {
                valtarget = TYPE_ALL;
            }
            /** add param to config */
            config_set_qualified_param(param, (void *)&valtarget, SQLVAR_TARGET_TYPE);
        }
        else
        {
            succp = false;
        }
    }

    if (succp)
    {
        service_add_qualified_param(service, param); /*< add param to svc */
    }
    return succp;
}
/*
 * Function to find a string in typelib_t
 * (similar to find_type() of mysys/typelib.c)
 *
 *       SYNOPSIS
 *       find_type()
 *       lib                  typelib_t
 *       find                 String to find
 *       length               Length of string to find
 *       part_match           Allow part matching of value
 *
 *       RETURN
 *       0 error
 *       > 0 position in TYPELIB->type_names +1
 */
static int find_type(typelib_t*  tl,
                     const char* needle,
                     int         maxlen)
{
    int i;

    if (tl == NULL || needle == NULL || maxlen <= 0)
    {
        return -1;
    }

    for (i = 0; i < tl->tl_nelems; i++)
    {
        if (strncasecmp(tl->tl_p_elems[i], needle, maxlen) == 0)
        {
            return i + 1;
        }
    }
    return 0;
}

/**
 * Add qualified config parameter to SERVICE struct.
 */
static void service_add_qualified_param(SERVICE*          svc,
                                        CONFIG_PARAMETER* param)
{
    spinlock_acquire(&svc->spin);

    if (svc->svc_config_param == NULL)
    {
        svc->svc_config_param = config_clone_param(param);
        svc->svc_config_param->next = NULL;
    }
    else
    {
        CONFIG_PARAMETER* p = svc->svc_config_param;
        CONFIG_PARAMETER* prev = NULL;

        while (true)
        {
            CONFIG_PARAMETER* old;

            /** Replace existing parameter in the list, free old */
            if (strncasecmp(param->name,
                            p->name,
                            strlen(param->name)) == 0)
            {
                old = p;
                p = config_clone_param(param);
                p->next = old->next;

                if (prev != NULL)
                {
                    prev->next = p;
                }
                else
                {
                    svc->svc_config_param = p;
                }
                free(old);
                break;
            }
            prev = p;
            p = p->next;

            /** Hit end of the list, add new parameter */
            if (p == NULL)
            {
                p = config_clone_param(param);
                prev->next = p;
                p->next = NULL;
                break;
            }
        }
    }
    /** Increment service's configuration version */
    atomic_add(&svc->svc_config_version, 1);
    spinlock_release(&svc->spin);
}

/**
 * Return the name of the service
 *
 * @param svc           The service
 */
char *
service_get_name(SERVICE *svc)
{
    return svc->name;
}

/**
 * Set the weighting parameter for the service
 *
 * @param       service         The service pointer
 * @param       weightby        The parameter name to weight the routing by
 */
void
serviceWeightBy(SERVICE *service, char *weightby)
{
    if (service->weightby)
    {
        free(service->weightby);
    }
    service->weightby = strdup(weightby);
}

/**
 * Return the parameter the wervice shoudl use to weight connections
 * by
 * @param service               The Service pointer
 */
char *
serviceGetWeightingParameter(SERVICE *service)
{
    return service->weightby;
}

/**
 * Enable/Disable localhost authentication match criteria
 * associated with this service.
 *
 * @param service       The service we are setting the data for
 * @param action        1 for enable, 0 for disable access
 * @return              0 on failure
 */

int
serviceEnableLocalhostMatchWildcardHost(SERVICE *service, int action)
{
    if (action != 0 && action != 1)
    {
        return 0;
    }

    service->localhost_match_wildcard_host = action;

    return 1;
}

void service_shutdown()
{
    SERVICE* svc;
    spinlock_acquire(&service_spin);
    svc = allServices;
    while (svc != NULL)
    {
        svc->svc_do_shutdown = true;
        svc = svc->next;
    }
    spinlock_release(&service_spin);
}

/**
 * Return the count of all sessions active for all services
 *
 * @return Count of all active sessions
 */
int
serviceSessionCountAll()
{
    SERVICE *service;
    int rval = 0;

    spinlock_acquire(&service_spin);
    service = allServices;
    while (service)
    {
        rval += service->stats.n_current;
        service = service->next;
    }
    spinlock_release(&service_spin);
    return rval;
}

/**
 * Provide a row to the result set that defines the set of service
 * listeners
 *
 * @param set   The result set
 * @param data  The index of the row to send
 * @return The next row or NULL
 */
static RESULT_ROW *
serviceListenerRowCallback(RESULTSET *set, void *data)
{
    int *rowno = (int *)data;
    int i = 0;;
    char buf[20];
    RESULT_ROW *row;
    SERVICE *service;
    SERV_LISTENER *lptr = NULL;

    spinlock_acquire(&service_spin);
    service = allServices;
    if (service)
    {
        lptr = service->ports;
    }
    while (i < *rowno && service)
    {
        lptr = service->ports;
        while (i < *rowno && lptr)
        {
            if ((lptr = lptr->next) != NULL)
            {
                i++;
            }
        }
        if (i < *rowno)
        {
            service = service->next;
            if (service && (lptr = service->ports) != NULL)
            {
                i++;
            }
        }
    }
    if (lptr == NULL)
    {
        spinlock_release(&service_spin);
        free(data);
        return NULL;
    }
    (*rowno)++;
    row = resultset_make_row(set);
    resultset_row_set(row, 0, service->name);
    resultset_row_set(row, 1, lptr->protocol);
    resultset_row_set(row, 2, (lptr && lptr->address) ? lptr->address : "*");
    sprintf(buf, "%d", lptr->port);
    resultset_row_set(row, 3, buf);
    resultset_row_set(row, 4,
                      (!lptr->listener || !lptr->listener->session ||
                       lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ?
                      "Stopped" : "Running");
    spinlock_release(&service_spin);
    return row;
}

/**
 * Return a resultset that has the current set of services in it
 *
 * @return A Result set
 */
RESULTSET *
serviceGetListenerList()
{
    RESULTSET *set;
    int *data;

    if ((data = (int *)malloc(sizeof(int))) == NULL)
    {
        return NULL;
    }
    *data = 0;
    if ((set = resultset_create(serviceListenerRowCallback, data)) == NULL)
    {
        free(data);
        return NULL;
    }
    resultset_add_column(set, "Service Name", 25, COL_TYPE_VARCHAR);
    resultset_add_column(set, "Protocol Module", 20, COL_TYPE_VARCHAR);
    resultset_add_column(set, "Address", 15, COL_TYPE_VARCHAR);
    resultset_add_column(set, "Port", 5, COL_TYPE_VARCHAR);
    resultset_add_column(set, "State", 8, COL_TYPE_VARCHAR);

    return set;
}

/**
 * Provide a row to the result set that defines the set of services
 *
 * @param set   The result set
 * @param data  The index of the row to send
 * @return The next row or NULL
 */
static RESULT_ROW *
serviceRowCallback(RESULTSET *set, void *data)
{
    int *rowno = (int *)data;
    int i = 0;
    char buf[20];
    RESULT_ROW *row;
    SERVICE *service;

    spinlock_acquire(&service_spin);
    service = allServices;
    while (i < *rowno && service)
    {
        i++;
        service = service->next;
    }
    if (service == NULL)
    {
        spinlock_release(&service_spin);
        free(data);
        return NULL;
    }
    (*rowno)++;
    row = resultset_make_row(set);
    resultset_row_set(row, 0, service->name);
    resultset_row_set(row, 1, service->routerModule);
    sprintf(buf, "%d", service->stats.n_current);
    resultset_row_set(row, 2, buf);
    sprintf(buf, "%d", service->stats.n_sessions);
    resultset_row_set(row, 3, buf);
    spinlock_release(&service_spin);
    return row;
}

/**
 * Return a result set that has the current set of services in it
 *
 * @return A Result set
 */
RESULTSET *
serviceGetList()
{
    RESULTSET *set;
    int *data;

    if ((data = (int *)malloc(sizeof(int))) == NULL)
    {
        return NULL;
    }
    *data = 0;
    if ((set = resultset_create(serviceRowCallback, data)) == NULL)
    {
        free(data);
        return NULL;
    }
    resultset_add_column(set, "Service Name", 25, COL_TYPE_VARCHAR);
    resultset_add_column(set, "Router Module", 20, COL_TYPE_VARCHAR);
    resultset_add_column(set, "No. Sessions", 10, COL_TYPE_VARCHAR);
    resultset_add_column(set, "Total Sessions", 10, COL_TYPE_VARCHAR);

    return set;
}

/**
 * Function called by the housekeeper thread to retry starting of a service
 * @param data Service to restart
 */
static void service_internal_restart(void *data)
{
    SERVICE* service = (SERVICE*)data;
    serviceStartAllPorts(service);
}

/**
 * Check that all services have listeners
 * @return True if all services have listeners
 */
bool service_all_services_have_listeners()
{
    bool rval = true;
    spinlock_acquire(&service_spin);

    SERVICE* service = allServices;

    while (service)
    {
        if (service->ports == NULL)
        {
            MXS_ERROR("Service '%s' has no listeners.", service->name);
            rval = false;
        }
        service = service->next;
    }

    spinlock_release(&service_spin);
    return rval;
}
